# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numerics import is_fp
from hysop.numerics.odesolvers.runge_kutta_coeffs import (
Itype,
Qtype,
rk_params,
available_methods,
)
# default methods to dump rational and integers expression to string
[docs]
def Q_dump(q):
return f"({q.numerator}/{q.denominator})"
[docs]
def I_dump(i):
return f"{i}"
[docs]
class TimeIntegrator:
def __str__(self):
return self.name()
[docs]
class RungeKutta(TimeIntegrator):
pass
# Tni = Tn + alpha_i*dt
# Xni = Xn + dt*sum(j=0,i-1,gamma_ij*Knj)
# Kni = F(Tni,Xni)
# X_n+1 = Xn + dt*sum(i=0,M-1,beta_i*Ki)
[docs]
class RhsFunction:
def __call__(self, out, X, t, dt, **kwds):
msg = "{}.__call__() has not been overrided."
msg = msg.format(type(self).__name__)
raise NotImplementedError(msg)
[docs]
class ExplicitRungeKutta(RungeKutta):
def __init__(self, method, I_dump=I_dump, Q_dump=Q_dump):
if method not in available_methods():
msg = "{} was not implemented yet! Valid values are {}."
msg = msg.format(method, available_methods.keys())
raise ValueError(msg)
params = rk_params[method]
self.method = method
self.order = params["order"]
self.stages = params["stages"]
self.alpha = params["alpha"]
self.beta = params["beta"]
self.gamma = params["gamma"]
self.I_dump = I_dump
self.Q_dump = Q_dump
self._hashable = True
[docs]
def __call__(
self, Xin, RHS, dt, t=0.0, views=None, Xout=None, buffers=None, **kwds
):
"""Buffers = dict of (nb_stages+1) np.ndarray of size compatible with Xin"""
check_instance(Xin, dict, keys=str, values=np.ndarray)
varnames = Xin.keys()
views = first_not_None(views, {k: Ellipsis for (k, v) in Xin.items()})
Xout = first_not_None(
Xout, {k: np.empty_like(v[views[k]]) for (k, v) in Xin.items()}
)
buffers = first_not_None(
buffers,
{
k: tuple(np.empty_like(v) for i in range(self.stages + 1))
for (k, v) in Xin.items()
},
)
check_instance(views, dict, keys=str, values=(type(Ellipsis), slice, tuple))
check_instance(Xout, dict, keys=str, values=np.ndarray)
check_instance(buffers, dict, keys=str, values=tuple)
assert callable(RHS), type(RHS)
if __debug__:
compute_shape = next(iter(Xout.values())).shape
assert varnames == Xout.keys()
assert varnames == views.keys()
assert varnames == buffers.keys()
for vname in varnames:
ivar = Xin[vname]
ovar = Xout[vname]
view = views[vname]
assert is_fp(ivar.dtype), ivar.dtype
assert is_fp(ovar.dtype), ovar.dtype
assert np.all(
ivar[view].shape == compute_shape
), f"{ivar[view].shape} vs {compute_shape}"
assert np.all(ovar.shape == compute_shape), ovar.shape
assert len(buffers[vname]) == self.stages + 1, self.stages + 1
for buf in buffers[vname]:
assert is_fp(buf.dtype), buf.dtype
assert np.all(buf.shape == ivar.shape)
Xtmp = {k: v[0] for (k, v) in buffers.items()}
K = tuple(
{k: v[i] for (k, v) in buffers.items()} for i in range(1, self.stages + 1)
)
for i in range(self.stages):
ai = self.alpha[i]
ti = t + float(ai) * dt
if i == 0:
RHS(out=K[i], X=Xin, t=ti, step=i, steps=self.stages, **kwds)
else:
for vname in varnames:
Xtmp[vname][...] = 0
for j in range(i):
gij = float(self.gamma[i - 1, j])
Xtmp[vname] += float(gij) * K[j][vname]
Xtmp[vname] *= dt
Xtmp[vname] += Xin[vname]
RHS(out=K[i], X=Xtmp, t=ti, step=i, steps=self.stages, **kwds)
for vname in varnames:
Xout[vname][...] = 0
for i, bi in enumerate(self.beta):
Xout[vname] += float(bi) * K[i][vname][views[vname]]
Xout[vname] *= dt
Xout[vname] += Xin[vname][views[vname]]
return Xout
def __eq__(self, other):
if not isinstance(other, ExplicitRungeKutta):
return NotImplemented
eq = self.method == other.method
# eq &= self.I_dump == other.I_dump
# eq &= self.Q_dump == other.Q_dump
return eq
def __ne__(self, other):
eq = self == other
if isinstance(eq, ExplicitRungeKutta):
return NotImplemented
return not eq
def __hash__(self):
return hash(self.method)
[docs]
def name(self):
return self.method
[docs]
def dump(self, val):
if isinstance(val, Itype):
return self.I_dump(val)
elif isinstance(val, Qtype):
return self.Q_dump(val)
else:
return f"{val}"
# Tni = Tn + alpha_i*dt
[docs]
def Tni(self, i, Tn, dt):
alpha = self.alpha[i]
if alpha == 0:
return f"{Tn}"
elif alpha == 1:
return f"{Tn} + {dt}"
else:
return f"{Tn} + {self.dump(alpha)}*{dt}"
# Xni = Xn + dt*sum(j=0,i-1,gamma_ij*Knj)
[docs]
def Xni_sum(self, i):
_sum = ""
gamma = self.gamma[i - 1, :]
for j in range(i):
g = gamma[j]
if g == 0:
continue
elif g == 1:
_sum += f"K[{j}]"
else:
_sum += f"{self.dump(g)}*K[{j}]"
_sum += " + "
_sum = _sum[:-3]
return _sum
[docs]
def Xni(self, i, Xn, dt):
if i > 0:
_sum = self.Xni_sum(i)
return f"{Xn} + {dt}*({_sum})"
else:
return f"{Xn}"
# X_n+1 = Xn + dt*sum(i=0,M-1,beta_i*Ki)
[docs]
def step_sum(self):
_sum = ""
for i in range(self.stages):
beta = self.beta[i]
if beta == 0:
continue
elif beta == 1:
_sum += f"K[{i}]"
else:
_sum += f"{self.dump(beta)}*K[{i}]"
_sum += " + "
_sum = _sum[:-3]
return _sum
[docs]
def step(self, Xn, dt):
return f"{Xn} + {dt}*({self.step_sum()})"
Euler = ExplicitRungeKutta("Euler")
RK1 = ExplicitRungeKutta("RK1")
RK2 = ExplicitRungeKutta("RK2")
RK3 = ExplicitRungeKutta("RK3")
RK4 = ExplicitRungeKutta("RK4")
RK4_38 = ExplicitRungeKutta("RK4_38")
if __name__ == "__main__":
R = ExplicitRungeKutta("RK4")
for i in range(4):
print(R.Tni(i, Tn="To", dt="dt"))
print()
for i in range(4):
print(R.Xni(i, Xn="Xo", dt="dt"))
print()
print(R.step(Xn="Xo", dt="dt"))
Xin = {"a": np.random.rand(10, 10).astype(np.float32)}
Xout = {"a": np.empty_like(Xin["a"])}
print("\nRHS=0")
class Rhs(RhsFunction):
def __call__(self, out, X, t, dt, **kwds):
out["a"][...] = 0
rhs = Rhs()
for Integrator in (Euler, RK2, RK3, RK4, RK4_38):
print(f" {Integrator.name().ljust(8)}", end=" ")
dt = 0.1
Integrator(Xin, rhs, dt=dt, Xout=Xout)
d = np.max(np.abs(Xout["a"] - Xin["a"]))
print(f" d={d}")
assert d < 1e-7, d
print("\nRHS=1")
def rhs(out, X, t, dt, **kwds):
out["a"][...] = 1
for Integrator in (Euler, RK2, RK3, RK4, RK4_38):
print(f" {Integrator.name().ljust(8)}", end=" ")
dt = 0.1
Integrator(Xin, rhs, dt=dt, Xout=Xout)
d = np.max(np.abs((Xout["a"] - Xin["a"]) - dt))
print(f" d={d}")
assert d < 1e-7, d
print("\nRHS=cos(t)")
def rhs(out, X, t, dt, **kwds):
out["a"][...] = np.cos(t)
for Integrator in (Euler, RK2, RK3, RK4, RK4_38):
print(f" {Integrator.name().ljust(8)}", end=" ")
dt = 0.1
Integrator(Xin, rhs, dt=dt, Xout=Xout)
alpha = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32)
beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32)
dX = np.dot(np.cos(alpha * dt), beta)
d = np.max(np.abs((Xout["a"] - Xin["a"]) - dX * dt))
print(f" d={d}")
assert d < 1e-7, d
print("\nRHS=f(x)")
def rhs(out, X, t, dt, **kwds):
out["a"][...] = 0.01 * X["a"] + 0.02
for Integrator in (Euler, RK2, RK3, RK4, RK4_38):
print(f" {Integrator.name().ljust(8)}", end=" ")
dt = 0.1
Integrator(Xin, rhs, dt=dt, Xout=Xout)
alpha = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32)
beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32)
K = [
None,
] * Integrator.stages
beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32)
for i in range(Integrator.stages):
if i == 0:
X = Xin["a"].copy()
else:
X[...] = 0
for j in range(i):
gij = float(Integrator.gamma[i - 1, j])
X[...] += float(gij) * K[j]
X[...] *= dt
X[...] += Xin["a"]
K[i] = 0.01 * X + 0.02
dX = np.zeros_like(X)
for i, bi in enumerate(beta):
dX += bi * K[i]
d = np.max(np.abs((Xout["a"] - Xin["a"]) - dX * dt))
print(f" d={d}")
assert d < 1e-7, d
print("\nRHS=f(x, t)")
def rhs(out, X, t, dt, **kwds):
out["a"][...] = 0.01 * X["a"] + 2 * t
for Integrator in (Euler, RK2, RK3, RK4, RK4_38):
print(f" {Integrator.name().ljust(8)}", end=" ")
dt = 0.1
Integrator(Xin, rhs, dt=dt, Xout=Xout)
alpha = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32)
beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32)
K = [
None,
] * Integrator.stages
alpha0 = np.asarray(tuple(float(x) for x in Integrator.alpha), dtype=np.float32)
beta = np.asarray(tuple(float(x) for x in Integrator.beta), dtype=np.float32)
for i in range(Integrator.stages):
ti = 0 + alpha[i] * dt
if i == 0:
X = Xin["a"].copy()
else:
X[...] = 0
for j in range(i):
gij = float(Integrator.gamma[i - 1, j])
X[...] += float(gij) * K[j]
X[...] *= dt
X[...] += Xin["a"]
K[i] = 0.01 * X + 2 * ti
dX = np.zeros_like(X)
for i, bi in enumerate(beta):
dX += bi * K[i]
d = np.max(np.abs((Xout["a"] - Xin["a"]) - dX * dt))
print(f" d={d}")
assert d < 1e-7, d